package com.stars.easyms.schedule.core;

import com.stars.easyms.schedule.DistributedTask;
import com.stars.easyms.schedule.DistributedTaskContext;
import com.stars.easyms.schedule.bean.DbScheduleSubTask;
import com.stars.easyms.schedule.client.DistributedTaskExecutionContent;
import com.stars.easyms.schedule.client.DistributedTaskExecutionContentWrapper;
import com.stars.easyms.schedule.client.DistributedTaskExecutionResult;
import com.stars.easyms.schedule.client.DistributedTaskExecutionResultWrapper;
import com.stars.easyms.schedule.enums.MQ;
import com.stars.easyms.schedule.factory.RestTemplateFactory;
import com.stars.easyms.schedule.impl.DistributedTaskContextImpl;
import com.stars.easyms.schedule.bean.DbScheduleSubTaskRecord;
import com.stars.easyms.schedule.enums.ExecuteMode;
import com.stars.easyms.schedule.mq.MQMessageSender;
import com.stars.easyms.schedule.factory.MQMessageSenderFactory;
import com.stars.easyms.schedule.enums.TaskType;
import com.stars.easyms.schedule.compatibility.QuartzCompatibility;
import com.stars.easyms.schedule.enums.TaskStatus;
import com.stars.easyms.schedule.record.AsynInsertSubTaskRecord;
import com.stars.easyms.schedule.service.DistributedScheduleService;
import com.stars.easyms.schedule.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import javax.sql.DataSource;
import java.lang.reflect.InvocationTargetException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 任务执行者
 *
 * @author guoguifang
 */
public final class DistributedTaskExecutor implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(DistributedTaskExecutor.class);

    /**
     * 事务管理器
     */
    private static DataSourceTransactionManager transactionManager;

    /**
     * 当前线程事务状态
     */
    private static final ThreadLocal<TransactionStatus> TRANSACTION_STATUS = new ThreadLocal<>();

    /**
     * 任务执行者的唯一编号
     */
    private final int id;

    /**
     * 当前任务执行器本次启动截止现在已经完成的任务数量
     */
    private long completedTaskCount;

    /**
     * 计时器：记录本次启动截止现在所有本执行器执行的所有任务信息
     */
    private final StopWatch stopWatch = new StopWatch();

    /**
     * 正在执行的子任务
     */
    private DbScheduleSubTask executableSubTask;

    /**
     * 正在执行的子任务名称
     */
    private String executableSubTaskName;

    /**
     * 执行任务处理异常时的异常信息
     */
    private Throwable throwable;

    /**
     * 子任务锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    @Override
    public void run() {
        mainLock.lock();
        try {
            execute();
        } finally {
            mainLock.unlock();
        }
    }

    private void execute() {
        if (executableSubTask == null) {
            return;
        }

        // 获取当前可执行任务的子任务名称
        executableSubTaskName = StringUtils.join(new Object[]{executableSubTask.getTaskId(), executableSubTask.getBatchNo(), executableSubTask.getPartitionCount(), executableSubTask.getPartitionIndex()}, "-");

        // 开启计时器
        stopWatch.start(executableSubTaskName);

        // 记录子任务开始时间
        if (logger.isInfoEnabled()) {
            logger.info("The distributed task [{}] begin to execute, start time: {}!", executableSubTaskName, DateUtil.getDateStrByMillisecond(new Date(stopWatch.getStartTimeMillis())));
        }

        // 根据任务类型分别执行对应的方法
        TaskType taskType = TaskType.forCode(executableSubTask.getTaskType());
        if (TaskType.MQ == taskType) {
            executeMQTask();
        } else if (TaskType.REMOTE == taskType) {
            executeRemoteTask();
        } else {
            executeNativeTask();
        }
    }

    /**
     * 执行MQ任务
     */
    private void executeMQTask() {
        // 获取mq连接地址与目标
        String mqConnection = executableSubTask.getMqConnection();
        String mqDestination = executableSubTask.getMqDestination();
        if (StringUtils.isBlank(mqConnection) || StringUtils.isBlank(mqDestination)) {
            executeError("Unable to get mq connection or destination/topic of executable task!");
            return;
        }

        // 获取mq发送对象并发送mq消息
        try {
            MQMessageSender mqMessageSender;
            if (MQ.ACTIVEMQ == MQ.forCode(executableSubTask.getMqType())) {
                String mqUser = executableSubTask.getMqUser();
                String mqPassword = executableSubTask.getMqPassword();
                if (StringUtils.isBlank(mqUser) || StringUtils.isBlank(mqPassword)) {
                    executeError("Unable to get mq user or password of executable task!");
                    return;
                }
                mqMessageSender = MQMessageSenderFactory.getActiveMQMessageSender(mqConnection, mqUser, mqPassword);
            } else {
                String mqProducerGroupName = executableSubTask.getMqProducerGroupName();
                if (StringUtils.isBlank(mqProducerGroupName)) {
                    executeError("Unable to get mq producer group name of executable task!");
                    return;
                }
                mqMessageSender = MQMessageSenderFactory.getRocketMQMessageSender(mqConnection, mqProducerGroupName);
            }
            boolean result = mqMessageSender.convertAndSend(mqDestination, getRemoteDistributedTaskExecutionContent());
            if (result) {
                executeFinish(true);
            } else {
                executeError("Mq message convert and send failure!");
            }
        } catch (Throwable t) {
            executeError(t);
        }
    }

    /**
     * 执行远程任务
     */
    private void executeRemoteTask() {
        // 获取远程调用的地址
        String remoteUrl = executableSubTask.getRemoteUrl();
        if (StringUtils.isBlank(remoteUrl)) {
            executeError("Unable to get remote url of executable task!");
            return;
        }

        // 使用rest模板远程调用url地址
        try {
            ResponseEntity<DistributedTaskExecutionResult> restResultResponseEntity = RestTemplateFactory.getRestTemplate().postForEntity(remoteUrl, getRemoteDistributedTaskExecutionContent(), DistributedTaskExecutionResult.class);
            if (restResultResponseEntity.getStatusCode() != HttpStatus.OK) {
                executeError("RemoteUrl '" + remoteUrl + "' connection timeout!");
                return;
            }
            DistributedTaskExecutionResult distributedTaskExecutionResult = restResultResponseEntity.getBody();
            if (distributedTaskExecutionResult != null) {
                DistributedTaskExecutionResultWrapper distributedTaskExecutionResultWrapper = new DistributedTaskExecutionResultWrapper(distributedTaskExecutionResult);
                if (!distributedTaskExecutionResultWrapper.isExecutionSuccess()) {
                    executeError(distributedTaskExecutionResultWrapper.getErrorMsg());
                    return;
                }
                executeComplete();
            } else {
                executeError("Remote distributed task execution result is null!");
            }
        } catch (Throwable t) {
            executeError(t);
        }
    }

    /**
     * 执行本地任务
     */
    private void executeNativeTask() {
        // 得到任务执行bean
        Object executableBean = getExecutableBean();
        if (executableBean == null) {
            executeError("Unable to get bean of executable task!");
            return;
        }

        ExecuteMode executeMode = ExecuteMode.DISTRIBUTED;
        // 判断是否实现分布式任务接口类，如果未实现则判断是否实现quartz的Job接口(目前只做quartz兼容)
        if (!(executableBean instanceof DistributedTask)) {
            if (!QuartzCompatibility.isQuartzJob(executableBean)) {
                executeError("The bean must implement the interface DistributedTask or org.quartz.Job!");
                return;
            }
            executeMode = ExecuteMode.QUARTZ;
        }

        // 将整个子任务执行过程进行事务包裹，保证整体单个子任务的事务一致性
        createTransaction();
        try {
            // 任务执行逻辑
            if (executeMode == ExecuteMode.QUARTZ) {
                QuartzCompatibility.invoke(executableBean, executableSubTask);
            } else {
                // 将得到的任务执行bean转换成Executable实例
                DistributedTask distributedTask = (DistributedTask) executableBean;
                DistributedTaskContext distributedTaskContext = new DistributedTaskContextImpl(executableSubTask);
                // 执行任务
                distributedTask.execute(distributedTaskContext);
            }

            // 若单个子任务执行成功则提交事务并执行子任务执行成功方法
            commitTransaction();
            executeComplete();
        } catch (InvocationTargetException e) {
            // 执行任务失败回滚事务并执行失败方法
            rollbackTransaction();
            executeError(e.getTargetException());
        } catch (Throwable t) {
            // 执行任务失败回滚事务并执行失败方法
            rollbackTransaction();
            executeError(t);
        } finally {
            TRANSACTION_STATUS.remove();
        }
    }

    /**
     * 设置可执行子任务
     * @param executableSubTask 可执行子任务
     * @return 是否设置成功
     */
    boolean setExecutableSubTask(DbScheduleSubTask executableSubTask) {
        mainLock.lock();
        try {
            if (this.executableSubTask == null) {
                this.executableSubTask = executableSubTask;
                return true;
            }
            return false;
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 获取执行任务bean：根据beanId获取bean，如果获取不到则使用beanClass
     */
    private Object getExecutableBean() {
        Object bean = null;
        String beanId = executableSubTask.getBeanId();
        if (StringUtils.isNotBlank(beanId)) {
            bean = ApplicationContextHolder.getBean(beanId);
        }
        if (bean == null) {
            String beanClassName = executableSubTask.getBeanClass();
            if (StringUtils.isNotBlank(beanClassName)) {
                Class beanClass = ReflectUtil.forNameWithNoException(beanClassName);
                if (beanClass != null) {
                    bean = ApplicationContextHolder.getBean(beanClass);
                    if (bean == null) {
                        try {
                            bean = beanClass.newInstance();
                        } catch (Exception e) {
                            logger.error("New instance of Class [" + beanClassName + "] failure!", e);
                        }
                    }
                }
            }
        }
        return bean;
    }

    /**
     * 当执行错误或异常时设置子任务状态为'处理异常'，并设置对应的异常信息
     */
    private void executeError(String errorMessage) {
        executableSubTask.setErrorMessage(errorMessage);
        executableSubTask.setStatus(TaskStatus.ERROR.getCode());
        executeFinish(false);
    }

    /**
     * 当执行错误或异常时设置子任务状态为'处理异常'，并设置对应的异常信息
     */
    private void executeError(Throwable throwable) {
        this.throwable = throwable;
        logger.error("The sub task [" + executableSubTaskName + "] execute failure!", throwable);
        executableSubTask.setErrorMessage("The sub task execute failure! Simple information is " + ThrowableUtil.getSimpleStackTrace(throwable) + " , for more information, please see logs!");
        executableSubTask.setStatus(TaskStatus.ERROR.getCode());
        executeFinish(false);
    }

    /**
     * 设置子任务状态为'处理完成'
     */
    private void executeComplete() {
        executableSubTask.setStatus(TaskStatus.COMPLETE.getCode());
        executeFinish(false);
    }

    /**
     * 描述：在子任务表数据中保存错误信息，并把子任务状态修改为"处理异常"
     */
    private void executeFinish(boolean isMqTaskFinish) {

        // 计时器结束
        stopWatch.stop();
        String endTime = DateUtil.getDateStrByMillisecond(new Date(stopWatch.getLastTaskInfo().getStopTimeMillis()));
        String consumingTime = TimeUtil.formatTime(stopWatch.getLastTaskUsedTimeMillis(), TimeUtil.ENGLISH, TimeUtil.MILLISECOND);

        // 如果是非mq任务，则修改任务的状态，mq任务不做修改
        if (!isMqTaskFinish) {
            // 修改子任务信息
            int updateCount = DistributedScheduleService.getInstance().updateDbScheduleSubTaskById(executableSubTask);
            if (updateCount > 0) {
                logger.info("The distributed task [{}] execute completely, status change for --> '{}', end time：{}! Total time-consuming: {}!", executableSubTaskName, TaskStatus.forCode(executableSubTask.getStatus()), endTime, consumingTime);
                // 异步记录子任务处理信息
                AsynInsertSubTaskRecord.add(getDbScheduleSubTaskRecord(false));

                // 当子任务执行完毕后需要通知调度器更新主任务状态
                DistributedScheduler.getSingleInstance().signalIdle();
            } else {
                logger.error("The distributed task [{}] execute completely, but the update database failed because of a database throwable or the task has been executed by other threads!", executableSubTaskName);
            }
        } else {
            logger.info("The distributed task [{}] mq message send successfully, end time：{}! Total time-consuming: {}!", executableSubTaskName, endTime, consumingTime);
            // 异步记录子任务处理信息
            AsynInsertSubTaskRecord.add(getDbScheduleSubTaskRecord(true));
        }

        // 计数增加并重置任务
        this.completedTaskCount++;
        this.executableSubTask = null;
        this.executableSubTaskName = null;
        this.throwable = null;

        // 当线程执行完时需要通知任务抓取器该线程可抓取下一个子任务
        DistributedScheduleCatcher.getSingleInstance().signalBlock();
    }

    /**
     * 获取分布式任务的远程调用上下文(包括rest与mq)
     */
    private DistributedTaskExecutionContent getRemoteDistributedTaskExecutionContent() {
        DistributedTaskExecutionContentWrapper distributedTaskExecutionContentWrapper = new DistributedTaskExecutionContentWrapper();
        distributedTaskExecutionContentWrapper.setId(executableSubTask.getId());
        distributedTaskExecutionContentWrapper.setVersion(executableSubTask.getVersion());
        distributedTaskExecutionContentWrapper.setTaskId(executableSubTask.getTaskId());
        distributedTaskExecutionContentWrapper.setTaskName(executableSubTask.getTaskName());
        distributedTaskExecutionContentWrapper.setBatchNo(executableSubTask.getBatchNo());
        distributedTaskExecutionContentWrapper.setPartitionCount(executableSubTask.getPartitionCount());
        distributedTaskExecutionContentWrapper.setPartitionIndex(executableSubTask.getPartitionIndex());
        distributedTaskExecutionContentWrapper.setParameters(MapUtil.parse(executableSubTask.getParameters()));
        distributedTaskExecutionContentWrapper.setMqCallback(executableSubTask.getMqCallback());
        return distributedTaskExecutionContentWrapper.getObject();
    }

    /**
     * 获取分布式调度子任务记录信息
     */
    private DbScheduleSubTaskRecord getDbScheduleSubTaskRecord(boolean isMqTaskFinish) {
        DbScheduleSubTaskRecord dbScheduleSubTaskRecord = new DbScheduleSubTaskRecord();
        dbScheduleSubTaskRecord.setBatchNo(executableSubTask.getBatchNo());
        dbScheduleSubTaskRecord.setParentId(executableSubTask.getParentId());
        dbScheduleSubTaskRecord.setSubTaskId(executableSubTask.getId());
        dbScheduleSubTaskRecord.setPartitionCount(executableSubTask.getPartitionCount());
        dbScheduleSubTaskRecord.setPartitionIndex(executableSubTask.getPartitionIndex());
        dbScheduleSubTaskRecord.setFireTime(new Timestamp(stopWatch.getLastTaskInfo().getStartTimeMillis()));
        dbScheduleSubTaskRecord.setStatus(executableSubTask.getStatus());
        dbScheduleSubTaskRecord.setErrorMessage(executableSubTask.getErrorMessage());
        dbScheduleSubTaskRecord.setThrowable(throwable);
        dbScheduleSubTaskRecord.setExecuteServerIp(DistributedScheduleManager.getSingleInstance().getDbScheduleServerConfig().getIp());
        dbScheduleSubTaskRecord.setExecuteServerPort(DistributedScheduleManager.getSingleInstance().getDbScheduleServerConfig().getPort());
        dbScheduleSubTaskRecord.setExecuteThreadId(this.id);
        if (!isMqTaskFinish) {
            dbScheduleSubTaskRecord.setFinishTime(new Timestamp(stopWatch.getLastTaskInfo().getStopTimeMillis()));
            dbScheduleSubTaskRecord.setUsedTime(stopWatch.getLastTaskInfo().getUsedTimeMillis());
        }
        return dbScheduleSubTaskRecord;
    }

    public int getId() {
        return this.id;
    }

    public DbScheduleSubTask getExecutableSubTask() {
        return this.executableSubTask;
    }

    public long getCompletedTaskCount() {
        return this.completedTaskCount;
    }

    public StopWatch getStopWatch() {
        return this.stopWatch;
    }

    /**
     * 创建事务
     */
    private void createTransaction() {
        if (transactionManager == null) {
            synchronized (DistributedTaskExecutor.class) {
                if (transactionManager == null) {
                    transactionManager = (DataSourceTransactionManager) ApplicationContextHolder.getBean(DataSourceTransactionManager.class);
                    if (transactionManager == null) {
                        transactionManager = new DataSourceTransactionManager(ApplicationContextHolder.getApplicationContext().getBean(DataSource.class));
                    }
                }
            }
        }
        TRANSACTION_STATUS.set(transactionManager.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW)));
    }

    /**
     * 提交事务
     */
    private void commitTransaction() {
        final TransactionStatus transactionStatus = TRANSACTION_STATUS.get();
        if(transactionStatus != null && !transactionStatus.isCompleted()) {
            if (transactionStatus.isRollbackOnly()) {
                transactionManager.rollback(transactionStatus);
            } else {
                transactionManager.commit(transactionStatus);
            }
        }
    }

    /**
     * 回滚事务
     */
    private void rollbackTransaction() {
        final TransactionStatus transactionStatus = TRANSACTION_STATUS.get();
        if(transactionStatus != null && !transactionStatus.isCompleted()) {
            transactionManager.rollback(transactionStatus);
        }
    }

    DistributedTaskExecutor(int id) {
        super();
        this.id = id;
    }

}
